package com.skyline.courier.net;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractConnection implements Connection {
	private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConnection.class);
	
	private static AtomicLong SEQ = new AtomicLong(0);
	protected Map<Long, SendFuture<?>> futures = new HashMap<Long, SendFuture<?>>();
	private Integer completeTimeout;
	private Integer flowTimeout;
	
	protected SocketAddress address;
	protected String id = UUID.randomUUID().toString().replaceAll("\\-", "");
	protected FlowManager flowManager = new FlowManagerImpl();
	protected Statistic statistic;
	protected ConnectionManager connectionManager;

	private long getSeq() {
		return SEQ.getAndIncrement();
	}
	
	@Override
	public void setFlowTimeout(Integer flowTimeout) {
		this.flowTimeout = flowTimeout;
	}
	
	@Override
	public void setCompeleteTimeout(Integer completeTimeout) {
		this.completeTimeout = completeTimeout;
	}

	@Override
	public <I, O> O send(I input) throws Throwable {
		if (!isOpen()) {
			reconnect();
		}

		// 流量控制
		flowAcquire();

		try {

			LOGGER.info("向[" + address + "]发送消息[" + input + "]");
			statistic.sentMsg(address, System.currentTimeMillis());

			O output = doSend(input);

			statistic.receivedMsg(address, System.currentTimeMillis());
			LOGGER.info("从[" + address + "]收到消息[" + input + "]");

			return output;

		} catch (Throwable e) {
			LOGGER.info("向[" + address + "]发送（或接收）消息失败");
			throw e;
		} finally {
			// 释放流量
			flowRelease();
		}

	}
	
	protected <I, O> O doSend(I input) throws Throwable {
		
		// 存储InvokeFuture
		final SendFuture<O> sendFuture = new SendFuture<O>();
		TransferBean transferBean = createTransferBean(input);
		futures.put(transferBean.getSeq(), sendFuture);
		
		doSend(transferBean, sendFuture);
		
		return getResult(sendFuture);
	}

	private <I> TransferBean createTransferBean(I input) {
		TransferBean request = new TransferBean();
		final long seq = getSeq();
		request.setSeq(seq);
		request.setTarget(input);
		return request;
	}
	
	protected abstract <O> void doSend(TransferBean input, final SendFuture<O> sendFuture);
	
	private <O> O getResult(final SendFuture<O> sendFuture) throws Throwable {
		O retObj = null;
		if (completeTimeout != null && completeTimeout > 0) {
			// 等待返回，直到Response返回或超时
			retObj = sendFuture.getResult(completeTimeout, TimeUnit.MILLISECONDS);
		} else {
			// 一直等待，直到Response返回
			retObj = sendFuture.getResult();
		}
		
		return retObj;
	}

	@Override
	public void reconnect() {
		LOGGER.info("connection[" + this + "]开始重新连接到服务端...");
		doReconnect();
		connectionManager.removeDisconnect(address);
		connectionManager.addConnected(address, this);
		LOGGER.info("connection[" + this + "]重新连接到服务端成功");
	}

	protected abstract void doReconnect();

	@Override
	public void close() {
		LOGGER.info("connection[" + this + "]即将断开链接...");
		doClose();
		connectionManager.removeConnected(address);
		connectionManager.addDisconnectAddress(address, this);
		LOGGER.info("connection[" + this + "]断开链接成功");
	}

	protected abstract void doClose();

	@Override
	public void setRemoteAddress(SocketAddress address) {
		this.address = address;
	}

	@Override
	public void setConnectionManager(ConnectionManager connectionManager) {
		this.connectionManager = connectionManager;
	}

	@Override
	public void setStatistic(Statistic statistic) {
		this.statistic = statistic;
	}

	@Override
	public void setFlowThreshold(int threshold) {
		if (threshold > 0) {
			flowManager.setThreshold(threshold);
		} else {
			flowManager = null;
		}
	}

	@Override
	public String toString() {
		return "{id:" + id + ", address:" + address + "}@Connection";
	}

	protected void flowRelease() {
		// 流量控制
		if (flowManager != null) {
			flowManager.release();
		}
	}

	protected void flowAcquire() {
		// 流量控制
		if (flowManager != null) {
			if (!flowManager.acquire(1, flowTimeout)) {
				throw new FlowExceededException("flow exceeded");
			}
		}
	}
}
